{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "view-in-colab"
      },
      "source": [
        "<a href=\"https://colab.research.google.com/github/pathwaycom/pathway/blob/main/examples/notebooks/tutorials/custom_reducers.ipynb\" target=\"_parent\"><img src=\"https://pathway.com/assets/colab-badge.svg\" alt=\"Run In Colab\" class=\"inline\"/></a>"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "notebook-instructions",
      "source": [
        "# Installing Pathway with Python 3.10+\n",
        "\n",
        "In the cell below, we install Pathway into a Python 3.10+ Linux runtime.\n",
        "\n",
        "> **If you are running in Google Colab, please run the colab notebook (Ctrl+F9)**, disregarding the 'not authored by Google' warning.\n",
        "> \n",
        "> **The installation and loading time is less than 1 minute**.\n"
      ],
      "metadata": {}
    },
    {
      "cell_type": "code",
      "id": "pip-installation-pathway",
      "source": [
        "%%capture --no-display\n",
        "!pip install --prefer-binary pathway"
      ],
      "execution_count": null,
      "outputs": [],
      "metadata": {}
    },
    {
      "cell_type": "code",
      "id": "logging",
      "source": [
        "import logging\n",
        "\n",
        "logging.basicConfig(level=logging.CRITICAL)"
      ],
      "execution_count": null,
      "outputs": [],
      "metadata": {}
    },
    {
      "cell_type": "markdown",
      "id": "2",
      "metadata": {
        "lines_to_next_cell": 0
      },
      "source": [
        "# Writing Simple Stateful Reducer in Pathway\n",
        "\n",
        "Pathway supports natively aggregation using a wide range of [reducers](/developers/api-docs/reducers/), e.g., [`sum`](/developers/api-docs/reducers#pathway.reducers.sum), [`count`](/developers/api-docs/reducers#pathway.reducers.count), or [`max`](/developers/api-docs/reducers#pathway.reducers.max). However, those might not cover all the necessary ways of aggregating values. In this tutorial, you learn how to write reducers implementing custom logic.\n",
        "\n",
        "For example, let's implement a custom stateful `stdev` reducer that computes the standard deviation."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "3",
      "metadata": {
        "lines_to_next_cell": 0
      },
      "outputs": [],
      "source": [
        "import pathway as pw\n",
        "\n",
        "# To use advanced features with Pathway Scale, get your free license key from\n",
        "# https://pathway.com/features and paste it below.\n",
        "# To use Pathway Community, comment out the line below.\n",
        "pw.set_license_key(\"demo-license-key-with-telemetry\")\n",
        "\n",
        "\n",
        "SHOW_DEBUG = False\n",
        "\n",
        "\n",
        "class StdDevAccumulator(pw.BaseCustomAccumulator):\n",
        "    def __init__(self, cnt, sum, sum_sq):\n",
        "        self.cnt = cnt\n",
        "        self.sum = sum\n",
        "        self.sum_sq = sum_sq\n",
        "\n",
        "    @classmethod\n",
        "    def from_row(cls, row):\n",
        "        [val] = row\n",
        "        if SHOW_DEBUG:\n",
        "            print(\"from_row()\")\n",
        "        return cls(1, val, val**2)\n",
        "\n",
        "    def update(self, other):\n",
        "        self.cnt += other.cnt\n",
        "        self.sum += other.sum\n",
        "        self.sum_sq += other.sum_sq\n",
        "        if SHOW_DEBUG:\n",
        "            print(\"update()\")\n",
        "\n",
        "    def compute_result(self) -> float:\n",
        "        mean = self.sum / self.cnt\n",
        "        mean_sq = self.sum_sq / self.cnt\n",
        "        if SHOW_DEBUG:\n",
        "            print(\"compute_result()\")\n",
        "        return mean_sq - mean**2\n",
        "\n",
        "\n",
        "stddev = pw.reducers.udf_reducer(StdDevAccumulator)"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "4",
      "metadata": {
        "lines_to_next_cell": 0
      },
      "source": [
        "Above, the [`pw.BaseCustomAccumulator`](/developers/api-docs/pathway#pathway.BaseCustomAccumulator) class is used as a base for the `StdDevAccumulator`, which describes the logic of the underlying accumulator. The accumulator class requires a few methods:\n",
        "* [`from_row`](/developers/api-docs/pathway#pathway.BaseCustomAccumulator.from_row), which constructs an accumulator from the values of a single row of a table (here, a single value since our reducer applies to a single column),\n",
        "* [`update`](/developers/api-docs/pathway#pathway.BaseCustomAccumulator.update), which updates one accumulator by another accumulator,\n",
        "* [`compute_result`](/developers/api-docs/pathway#pathway.BaseCustomAccumulator.compute_result), which produces the output based on the accumulator state,\n",
        "* [`retract`](/developers/api-docs/pathway#pathway.BaseCustomAccumulator.retract), is an optional method, which processes negative updates,\n",
        "* [`neutral`](/developers/api-docs/pathway#pathway.BaseCustomAccumulator.neutral), is an optional method, which returns state corresponding to consuming 0 rows.\n",
        "\n",
        "Now, let's see the reducer in action."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "5",
      "metadata": {
        "lines_to_next_cell": 0
      },
      "outputs": [
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "            | avg                | stddev\n",
            "^PWSRT42... | 25.933333333333334 | 3.3355555555555156\n"
          ]
        }
      ],
      "source": [
        "temperature_data = pw.debug.table_from_markdown(\n",
        "    \"\"\"\n",
        "date       |   temperature\n",
        "2023-06-06 |   28.0\n",
        "2023-06-07 |   23.1\n",
        "2023-06-08 |   24.5\n",
        "2023-06-09 |   26.0\n",
        "2023-06-10 |   28.3\n",
        "2023-06-11 |   25.7\n",
        "\"\"\"\n",
        ")\n",
        "\n",
        "temperature_statistics = temperature_data.reduce(\n",
        "    avg=pw.reducers.avg(pw.this.temperature), stddev=stddev(pw.this.temperature)\n",
        ")\n",
        "\n",
        "pw.debug.compute_and_print(temperature_statistics)"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "6",
      "metadata": {
        "lines_to_next_cell": 0
      },
      "source": [
        "However, with this logic, our reducer is not smartly processing negative updates: it starts the computation from scratch whenever a negative update is encountered.\n",
        "You can see this in action by enabling debug information and processing table where row removal happens. Let's insert several values at time 0 and then remove one already inserted value and add another at time 2."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "7",
      "metadata": {},
      "outputs": [
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "from_row()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "update()\n",
            "compute_result()\n",
            "compute_result()\n",
            "            | avg               | stddev\n",
            "^PWSRT42... | 25.96666666666667 | 3.3255555555555247\n"
          ]
        }
      ],
      "source": [
        "SHOW_DEBUG = True\n",
        "temperature_data_with_updates = pw.debug.table_from_markdown(\n",
        "    \"\"\"\n",
        "date       |   temperature | __time__ | __diff__\n",
        "2023-06-06 |   28.0        |        0 |        1\n",
        "2023-06-07 |   23.1        |        0 |        1\n",
        "2023-06-08 |   24.5        |        0 |        1\n",
        "2023-06-09 |   26.0        |        0 |        1\n",
        "2023-06-10 |   28.3        |        0 |        1\n",
        "2023-06-11 |   25.7        |        0 |        1\n",
        "2023-06-11 |   25.7        |        2 |       -1\n",
        "2023-06-11 |   25.9        |        2 |        1\n",
        "\"\"\"\n",
        ")\n",
        "\n",
        "temperature_statistics_with_updates = temperature_data_with_updates.reduce(\n",
        "    avg=pw.reducers.avg(pw.this.temperature), stddev=stddev(pw.this.temperature)\n",
        ")\n",
        "\n",
        "pw.debug.compute_and_print(temperature_statistics_with_updates)"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "8",
      "metadata": {
        "lines_to_next_cell": 0
      },
      "source": [
        "It can be alleviated by extending our reducer and providing a method for processing negative updates."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "9",
      "metadata": {
        "lines_to_next_cell": 0
      },
      "outputs": [],
      "source": [
        "class ImprovedStdDevAccumulator(StdDevAccumulator):\n",
        "    def retract(self, other):\n",
        "        self.cnt -= other.cnt\n",
        "        self.sum -= other.sum\n",
        "        self.sum_sq -= other.sum_sq\n",
        "        if SHOW_DEBUG:\n",
        "            print(\"retract()\")\n",
        "\n",
        "\n",
        "improved_stddev = pw.reducers.udf_reducer(ImprovedStdDevAccumulator)"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "10",
      "metadata": {
        "lines_to_next_cell": 0
      },
      "source": [
        "And now you can test the improved reducer in action."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "11",
      "metadata": {},
      "outputs": [
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "from_row()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "update()\n",
            "from_row()\n",
            "retract()\n",
            "compute_result()\n",
            "compute_result()\n",
            "            | avg               | stddev\n",
            "^PWSRT42... | 25.96666666666667 | 3.3255555555555247\n"
          ]
        }
      ],
      "source": [
        "\n",
        "temperature_statistics_improved = temperature_data_with_updates.reduce(\n",
        "    avg=pw.reducers.avg(pw.this.temperature),\n",
        "    stddev=improved_stddev(pw.this.temperature),\n",
        ")\n",
        "\n",
        "pw.debug.compute_and_print(temperature_statistics_improved)"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "12",
      "metadata": {},
      "source": [
        "In the example above, 10x calls to `update()` and 12x calls to `from_row()` are replaced with 6x calls to `update()`, 1x call to `retract()` and 8x calls to `from_row()`.\n",
        "\n",
        "This comes from the fact that former reducer:\n",
        "* had to call `from_row()` for each row of the table, wrapping each single value into separate `StdDevAccumulator` object,\n",
        "* had to call `update()` for each row of the table except the first consumed,\n",
        "* had to restart from scratch after the update to the table, thus it had to pay the cost twice.\n",
        "\n",
        "While the latter reducer aggregated the table at time 0 in the same way as former one, but processed the update differently:\n",
        "* had to wrap both delete and insert updates with `from_row()` calls\n",
        "* called once `retract()` and once `update()`."
      ]
    }
  ],
  "metadata": {
    "kernelspec": {
      "display_name": "Python 3 (ipykernel)",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.11.11"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 5
}